package com.isyscore.os.etl.utils;

import com.isyscore.os.core.model.entity.JobConfig;
import com.isyscore.os.etl.model.dto.JobRunParamDTO;
import com.isyscore.os.etl.model.enums.DeployModeEnum;
import com.isyscore.os.etl.model.enums.JobType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CommandUtils {
    private static String appClassName;
    @Value("${flink-config.app-class-name}")
    public void setAppClassName(String className) {
        appClassName = className;
    }

    private static String innerJarName;
    @Value("${flink-config.inner-jar-name}")
    public void setInnerJarName(String jarName) {
        innerJarName = jarName;
    }

    private static String flinkHttpAddress;
    @Value("${flink-config.host}:${flink-config.port}")
    public void setFlinkHttpAddress(String address) {
        flinkHttpAddress = removeHttpProtocol(address);
    }

    public static String buildRunCommandForCluster(JobRunParamDTO jobRunParamDTO, JobConfig jobConfig,
                                                   String clientHome) throws Exception {
        StringBuilder command = new StringBuilder();
        command.append(jobRunParamDTO.getFlinkBinPath()).append(" run -d ");

        if (DeployModeEnum.LOCAL.name().equals(jobConfig.getDeployMode())) {
            command.append("-m ").append(flinkHttpAddress).append(" ");
        } else if (DeployModeEnum.STANDALONE.name().equals(jobConfig.getDeployMode())) {
            if (StringUtils.isNotEmpty(jobConfig.getFlinkRunConfig())) {
                command.append(jobConfig.getFlinkRunConfig());
            }
        }

        if (StringUtils.isNotEmpty(jobConfig.getExtJarPath())) {
            String[] urls = jobConfig.getExtJarPath().split(",");
            for (String url : urls) {
                command.append(" -C ").append(url.trim()).append(" ");
            }
        }
        switch (JobType.getJobType(jobConfig.getJobType())) {

            case SQL_BATCH:
            case VIEW:
            case SQL_STREAMING:
                command.append(" -c ").append(appClassName).append(" ");
                command.append(clientHome).append("/").append(innerJarName);
                command.append(" -sql ").append(jobRunParamDTO.getSqlPath()).append(" ");
                command.append(" -type ").append(jobConfig.getJobType()).append(" ");
                break;
            case JAR:
                command.append(" -c ").append(jobConfig.getCustomMainClass()).append(" ");
                command.append(jobRunParamDTO.getMainJarPath());
                command.append(" ").append(jobConfig.getCustomArgs());
                break;
        }

        log.info("buildRunCommandForLocal runCommand={}", command);
        return command.toString();
    }


    public static String parseFlinkHttpAddress(String flinkRunConfig) {
        if (flinkRunConfig == null) {
            return null;
        }
        String[] split = flinkRunConfig.split(" +");
        int i = 0;
        for (; i < split.length; i++) {
            if ("-m".equals(split[i])) {
                break;
            }
        }
        if (i+1 < split.length) {
            return split[i+1];
        } else {
            return null;
        }
    }

    public static String removeHttpProtocol(String url) {
        if (url.startsWith("http://") || url.startsWith("https://")) {
            return url.split("//")[1];
        }
        return url;
    }
}
